1 /*
2 * Copyright (C) 2007 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkNotNull;
20
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
27
28 import javax.annotation.Nullable;
29
30 /**
31 * An abstract implementation of the {@link ListenableFuture} interface. This
32 * class is preferable to {@link java.util.concurrent.FutureTask} for two
33 * reasons: It implements {@code ListenableFuture}, and it does not implement
34 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
35 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
36 * tasks to a {@link ListeningExecutorService}.)
37 *
38 * <p>This class implements all methods in {@code ListenableFuture}.
39 * Subclasses should provide a way to set the result of the computation through
40 * the protected methods {@link #set(Object)} and
41 * {@link #setException(Throwable)}. Subclasses may also override {@link
42 * #interruptTask()}, which will be invoked automatically if a call to {@link
43 * #cancel(boolean) cancel(true)} succeeds in canceling the future.
44 *
45 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
46 * with concurrency issues and guarantee thread safety.
47 *
48 * <p>The state changing methods all return a boolean indicating success or
49 * failure in changing the future's state. Valid states are running,
50 * completed, failed, or cancelled.
51 *
52 * <p>This class uses an {@link ExecutionList} to guarantee that all registered
53 * listeners will be executed, either when the future finishes or, for listeners
54 * that are added after the future completes, immediately.
55 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
56 * are not necessarily executed in the order in which they were added. (If a
57 * listener is added after the Future is complete, it will be executed
58 * immediately, even if earlier listeners have not been executed. Additionally,
59 * executors need not guarantee FIFO execution, or different listeners may run
60 * in different executors.)
61 *
62 * @author Sven Mawson
63 * @since 1.0
64 */
65 public abstract class AbstractFuture<V> implements ListenableFuture<V> {
66
67 /** Synchronization control for AbstractFutures. */
68 private final Sync<V> sync = new Sync<V>();
69
70 // The execution list to hold our executors.
71 private final ExecutionList executionList = new ExecutionList();
72
73 /**
74 * Constructor for use by subclasses.
75 */
76 protected AbstractFuture() {}
77
78 /*
79 * Improve the documentation of when InterruptedException is thrown. Our
80 * behavior matches the JDK's, but the JDK's documentation is misleading.
81 */
82 /**
83 * {@inheritDoc}
84 *
85 * <p>The default {@link AbstractFuture} implementation throws {@code
86 * InterruptedException} if the current thread is interrupted before or during
87 * the call, even if the value is already available.
88 *
89 * @throws InterruptedException if the current thread was interrupted before
90 * or during the call (optional but recommended).
91 * @throws CancellationException {@inheritDoc}
92 */
93 @Override
94 public V get(long timeout, TimeUnit unit) throws InterruptedException,
95 TimeoutException, ExecutionException {
96 return sync.get(unit.toNanos(timeout));
97 }
98
99 /*
100 * Improve the documentation of when InterruptedException is thrown. Our
101 * behavior matches the JDK's, but the JDK's documentation is misleading.
102 */
103 /**
104 * {@inheritDoc}
105 *
106 * <p>The default {@link AbstractFuture} implementation throws {@code
107 * InterruptedException} if the current thread is interrupted before or during
108 * the call, even if the value is already available.
109 *
110 * @throws InterruptedException if the current thread was interrupted before
111 * or during the call (optional but recommended).
112 * @throws CancellationException {@inheritDoc}
113 */
114 @Override
115 public V get() throws InterruptedException, ExecutionException {
116 return sync.get();
117 }
118
119 @Override
120 public boolean isDone() {
121 return sync.isDone();
122 }
123
124 @Override
125 public boolean isCancelled() {
126 return sync.isCancelled();
127 }
128
129 @Override
130 public boolean cancel(boolean mayInterruptIfRunning) {
131 if (!sync.cancel(mayInterruptIfRunning)) {
132 return false;
133 }
134 executionList.execute();
135 if (mayInterruptIfRunning) {
136 interruptTask();
137 }
138 return true;
139 }
140
141 /**
142 * Subclasses can override this method to implement interruption of the
143 * future's computation. The method is invoked automatically by a successful
144 * call to {@link #cancel(boolean) cancel(true)}.
145 *
146 * <p>The default implementation does nothing.
147 *
148 * @since 10.0
149 */
150 protected void interruptTask() {
151 }
152
153 /**
154 * Returns true if this future was cancelled with {@code
155 * mayInterruptIfRunning} set to {@code true}.
156 *
157 * @since 14.0
158 */
159 protected final boolean wasInterrupted() {
160 return sync.wasInterrupted();
161 }
162
163 /**
164 * {@inheritDoc}
165 *
166 * @since 10.0
167 */
168 @Override
169 public void addListener(Runnable listener, Executor exec) {
170 executionList.add(listener, exec);
171 }
172
173 /**
174 * Subclasses should invoke this method to set the result of the computation
175 * to {@code value}. This will set the state of the future to
176 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
177 * state was successfully changed.
178 *
179 * @param value the value that was the result of the task.
180 * @return true if the state was successfully changed.
181 */
182 protected boolean set(@Nullable V value) {
183 boolean result = sync.set(value);
184 if (result) {
185 executionList.execute();
186 }
187 return result;
188 }
189
190 /**
191 * Subclasses should invoke this method to set the result of the computation
192 * to an error, {@code throwable}. This will set the state of the future to
193 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
194 * state was successfully changed.
195 *
196 * @param throwable the exception that the task failed with.
197 * @return true if the state was successfully changed.
198 */
199 protected boolean setException(Throwable throwable) {
200 boolean result = sync.setException(checkNotNull(throwable));
201 if (result) {
202 executionList.execute();
203 }
204 return result;
205 }
206
207 /**
208 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
209 * private subclass to hold the synchronizer. This synchronizer is used to
210 * implement the blocking and waiting calls as well as to handle state changes
211 * in a thread-safe manner. The current state of the future is held in the
212 * Sync state, and the lock is released whenever the state changes to
213 * {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}
214 *
215 * <p>To avoid races between threads doing release and acquire, we transition
216 * to the final state in two steps. One thread will successfully CAS from
217 * RUNNING to COMPLETING, that thread will then set the result of the
218 * computation, and only then transition to COMPLETED, CANCELLED, or
219 * INTERRUPTED.
220 *
221 * <p>We don't use the integer argument passed between acquire methods so we
222 * pass around a -1 everywhere.
223 */
224 static final class Sync<V> extends AbstractQueuedSynchronizer {
225
226 private static final long serialVersionUID = 0L;
227
228 /* Valid states. */
229 static final int RUNNING = 0;
230 static final int COMPLETING = 1;
231 static final int COMPLETED = 2;
232 static final int CANCELLED = 4;
233 static final int INTERRUPTED = 8;
234
235 private V value;
236 private Throwable exception;
237
238 /*
239 * Acquisition succeeds if the future is done, otherwise it fails.
240 */
241 @Override
242 protected int tryAcquireShared(int ignored) {
243 if (isDone()) {
244 return 1;
245 }
246 return -1;
247 }
248
249 /*
250 * We always allow a release to go through, this means the state has been
251 * successfully changed and the result is available.
252 */
253 @Override
254 protected boolean tryReleaseShared(int finalState) {
255 setState(finalState);
256 return true;
257 }
258
259 /**
260 * Blocks until the task is complete or the timeout expires. Throws a
261 * {@link TimeoutException} if the timer expires, otherwise behaves like
262 * {@link #get()}.
263 */
264 V get(long nanos) throws TimeoutException, CancellationException,
265 ExecutionException, InterruptedException {
266
267 // Attempt to acquire the shared lock with a timeout.
268 if (!tryAcquireSharedNanos(-1, nanos)) {
269 throw new TimeoutException("Timeout waiting for task.");
270 }
271
272 return getValue();
273 }
274
275 /**
276 * Blocks until {@link #complete(Object, Throwable, int)} has been
277 * successfully called. Throws a {@link CancellationException} if the task
278 * was cancelled, or a {@link ExecutionException} if the task completed with
279 * an error.
280 */
281 V get() throws CancellationException, ExecutionException,
282 InterruptedException {
283
284 // Acquire the shared lock allowing interruption.
285 acquireSharedInterruptibly(-1);
286 return getValue();
287 }
288
289 /**
290 * Implementation of the actual value retrieval. Will return the value
291 * on success, an exception on failure, a cancellation on cancellation, or
292 * an illegal state if the synchronizer is in an invalid state.
293 */
294 private V getValue() throws CancellationException, ExecutionException {
295 int state = getState();
296 switch (state) {
297 case COMPLETED:
298 if (exception != null) {
299 throw new ExecutionException(exception);
300 } else {
301 return value;
302 }
303
304 case CANCELLED:
305 case INTERRUPTED:
306 throw cancellationExceptionWithCause(
307 "Task was cancelled.", exception);
308
309 default:
310 throw new IllegalStateException(
311 "Error, synchronizer in invalid state: " + state);
312 }
313 }
314
315 /**
316 * Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link
317 * INTERRUPTED}.
318 */
319 boolean isDone() {
320 return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
321 }
322
323 /**
324 * Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}.
325 */
326 boolean isCancelled() {
327 return (getState() & (CANCELLED | INTERRUPTED)) != 0;
328 }
329
330 /**
331 * Checks if the state is {@link #INTERRUPTED}.
332 */
333 boolean wasInterrupted() {
334 return getState() == INTERRUPTED;
335 }
336
337 /**
338 * Transition to the COMPLETED state and set the value.
339 */
340 boolean set(@Nullable V v) {
341 return complete(v, null, COMPLETED);
342 }
343
344 /**
345 * Transition to the COMPLETED state and set the exception.
346 */
347 boolean setException(Throwable t) {
348 return complete(null, t, COMPLETED);
349 }
350
351 /**
352 * Transition to the CANCELLED or INTERRUPTED state.
353 */
354 boolean cancel(boolean interrupt) {
355 return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
356 }
357
358 /**
359 * Implementation of completing a task. Either {@code v} or {@code t} will
360 * be set but not both. The {@code finalState} is the state to change to
361 * from {@link #RUNNING}. If the state is not in the RUNNING state we
362 * return {@code false} after waiting for the state to be set to a valid
363 * final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link
364 * #INTERRUPTED}).
365 *
366 * @param v the value to set as the result of the computation.
367 * @param t the exception to set as the result of the computation.
368 * @param finalState the state to transition to.
369 */
370 private boolean complete(@Nullable V v, @Nullable Throwable t,
371 int finalState) {
372 boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
373 if (doCompletion) {
374 // If this thread successfully transitioned to COMPLETING, set the value
375 // and exception and then release to the final state.
376 this.value = v;
377 // Don't actually construct a CancellationException until necessary.
378 this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
379 ? new CancellationException("Future.cancel() was called.") : t;
380 releaseShared(finalState);
381 } else if (getState() == COMPLETING) {
382 // If some other thread is currently completing the future, block until
383 // they are done so we can guarantee completion.
384 acquireShared(-1);
385 }
386 return doCompletion;
387 }
388 }
389
390 static final CancellationException cancellationExceptionWithCause(
391 @Nullable String message, @Nullable Throwable cause) {
392 CancellationException exception = new CancellationException(message);
393 exception.initCause(cause);
394 return exception;
395 }
396 }